Skip to content

[drafr] [fix] [client] Fix IO buffer overflow when resend msg after producer reconnect #21351

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

poorbarcode
Copy link
Contributor

Motivation

If too many messages were called sendAsync when the state of the producer is reconnecting, after connection successfully, the messages cached in the memory will be flushed to the socket once, then the buffer overflow.

Got exception java.lang.IllegalStateException: buffer queue length overflow: 2143977581 + 4195072
	at io.netty.channel.AbstractCoalescingBufferQueue.incrementReadableBytes(AbstractCoalescingBufferQueue.java:368)
	at io.netty.channel.AbstractCoalescingBufferQueue.add(AbstractCoalescingBufferQueue.java:100)
	at io.netty.channel.AbstractCoalescingBufferQueue.add(AbstractCoalescingBufferQueue.java:84)
	at io.netty.handler.ssl.SslHandler.write(SslHandler.java:758)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:881)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
	at org.apache.pulsar.common.protocol.ByteBufPair$CopyingEncoder.write(ByteBufPair.java:149)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:881)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
	at org.apache.pulsar.client.impl.ProducerImpl.recoverProcessOpSendMsgFrom(ProducerImpl.java:2283)
	at org.apache.pulsar.client.impl.ProducerImpl.lambda$resendMessages$17(ProducerImpl.java:1918)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)

Modifications

  • To avoid IO buffer overflow, split to multi-flush.
  • TODO add a test

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@poorbarcode poorbarcode self-assigned this Oct 12, 2023
@poorbarcode poorbarcode added the type/bug The PR fixed a bug or issue reported a bug label Oct 12, 2023
@poorbarcode poorbarcode added this to the 3.2.0 milestone Oct 12, 2023
@poorbarcode poorbarcode changed the title [fix] [client] Fix IO buffer overflow when resend msg after producer reconnect [drafr] [fix] [client] Fix IO buffer overflow when resend msg after producer reconnect Oct 12, 2023
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Oct 12, 2023
@@ -2284,12 +2285,18 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e
if (stripChecksum) {
stripChecksum(op);
}
// To avoid IO buffer overflow, split to multi-flush.
if (messageBytesSizeInCache + op.cmd.readableBytes() < messageBytesSizeInCache) {
cnx.ctx().flush();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush is an async operation, so this might not work as expected

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since write and flush will be executed in the same thread, the actions of both write and flush will keep the order to execute. So it will work as expected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean from a different aspect. I guess that in this case, the purpose of flushing is to ensure that buffers don't overflow. It feels like the solution in this PR won't fully address that since all operations will be queued up in any case unless the logic that calls write is also run in the ctx loop (thread). Other possibility would be to somehow wait for flush completion until it finishes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Netty, there's the Channel.isWritable method and channelWritabilityChanged callback that help in keeping the queued writes bounded (see low/high watermark options). However, IIRC, Pulsar code base doesn't show examples of how write logic could be implemented to take advantage of this type of backpressure solution.
Optimally, the logic would be implemented in a way where more writes are added while the channel is writable and then paused. Adding more writes should resume after the channel becomes writable again.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that flushing will help resolve the issue.

In Netty, there's the Channel.isWritable method and channelWritabilityChanged callback that help in keeping the queued writes bounded (see low/high watermark options). However, IIRC, Pulsar code base doesn't show examples of how write logic could be implemented to take advantage of this type of backpressure solution.
Optimally, the logic would be implemented in a way where more writes are added while the channel is writable and then paused. Adding more writes should resume after the channel becomes writable again.

@merlimat
Copy link
Contributor

I'm still not sure how it gets to 2GB of accumulated data. The client memory limit backpressure should have blocked long before the 2GB.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs release/2.10.6 release/2.11.4 release/3.0.12 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants